package com.dahuan.tables;

import com.dahuan.bean.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Table_TimeAndWindow_ProcessingTime {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );
        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        // 2. 读入文件数据，得到DataStream
        DataStream<String> inputStream = env.readTextFile(path);

        // 3. 转换成POJO
        DataStream<SensorReading> dataStream = inputStream.map( line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        //4.将流转换成表，定义时间特性
        Table dataTable = tableEnv.fromDataStream( dataStream,"id, timestamp as ts, temperature as temp, pt.proctime" );


        dataTable.printSchema();
        tableEnv.toAppendStream( dataTable, Row.class ).print();


        env.execute("Table_TimeAndWindow");
    }
}
